-
Notifications
You must be signed in to change notification settings - Fork 222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[client] RecordAccumulator's ready method will be blocked if no batch is ready. #322
base: main
Are you sure you want to change the base?
Conversation
597ddd3
to
8b54e99
Compare
@wuchong @swuferhong @luoyuxia , CC |
8b54e99
to
8ff3741
Compare
rebase |
accum.ready(cluster, 500L); | ||
}); | ||
assertThat(future).isNotCompleted(); | ||
Thread.sleep(1000L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use future.get()
or future.join()
to wait async result. Never use sleep as it is not guaranteed to finish in the sleep time in different testing environment, and makes the tests unstable.
@@ -391,6 +392,41 @@ void testAwaitFlushComplete() throws Exception { | |||
assertThatThrownBy(accum::awaitFlushCompletion).isInstanceOf(InterruptedException.class); | |||
} | |||
|
|||
@Test | |||
void testReadyWithTimeOut() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of this test? Do we need to check the accumulator.ready
result?
Maybe we can take Kafka's RecordAccumulatorTest#testNextReadyCheckDelay
as an example.
return ready(cluster, batchTimeoutMs); | ||
} | ||
|
||
public ReadyCheckResult ready(Cluster cluster, long timeoutMs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to provide this method? only testing purpose? If yes, why not use the batchTimeoutMs
parameter in tests?
final int dequeSize; | ||
final boolean full; | ||
|
||
boolean exhausted = writerBufferPool.queued() > 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be moved out of batches foreach, as it requires to obtain a lock.
inLock( | ||
batchReadyLock, | ||
() -> { | ||
if (timeoutMs > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be checked before lock .
@@ -532,6 +550,7 @@ private RecordAppendResult tryAppend( | |||
WriteBatch last = deque.peekLast(); | |||
if (last != null) { | |||
boolean success = last.tryAppend(writeRecord, callback); | |||
notifyBatchReady(deque); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why here need to notify batch ready?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wuchong If not try to notify batch after each insert, the RecordAccumulator#ready will be blocked until timeout. I use use a producer-consumer-mode in RecordAccumulator, which block in consumer and signal by producer.
Maybe I do same thing as kafka, accumulator.ready return the min wait time of each bucket, and then sender suspends with Thread.sleep(nextReadyCheckDelayMs)
and doesn't signal by consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean why notify for each record? Not when a batch is ready? It will result in busy loop that the sender doesn't wait at all.
Purpose
Linked issue: close #306
Tests
com.alibaba.fluss.client.write.RecordAccumulatorTest#testReadyWithTimeOut